iT邦幫忙

2025 iThome 鐵人賽

0

小明的數位通訊系統

今天是我們系列的最後一天!我們要為量化交易系統加上 Telegram 通知功能,這就像為農場安裝現代化的通訊系統一樣。不管我身在何處,都能即時收到農場的重要訊息 - 交易執行、系統狀態、盈虧變化等等!

Telegram Bot 整合架構

系統通知架構

系統通知架構

Telegram Bot 實作

1. Bot 核心功能

# src/notifications/telegram_bot.py
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import aiohttp
import json
from dataclasses import dataclass

@dataclass
class TelegramConfig:
    bot_token: str
    chat_id: str
    rate_limit: int = 30  # 每分鐘最多30條訊息
    retry_attempts: int = 3
    retry_delay: int = 5

class TelegramBot:
    """Telegram 機器人通知系統"""
    
    def __init__(self, config: TelegramConfig):
        self.config = config
        self.base_url = f"https://api.telegram.org/bot{config.bot_token}"
        self.session = None
        self.message_queue = asyncio.Queue()
        self.rate_limiter = RateLimiter(config.rate_limit)
        self.logger = logging.getLogger(__name__)
        
    async def start(self):
        """啟動 Telegram Bot"""
        self.session = aiohttp.ClientSession()
        
        # 驗證 Bot Token
        bot_info = await self.get_bot_info()
        if bot_info:
            self.logger.info(f"Telegram Bot started: {bot_info['username']}")
            
            # 啟動訊息處理任務
            asyncio.create_task(self._message_processor())
        else:
            raise Exception("Failed to start Telegram Bot")
    
    async def get_bot_info(self):
        """獲取 Bot 資訊"""
        try:
            async with self.session.get(f"{self.base_url}/getMe") as response:
                if response.status == 200:
                    data = await response.json()
                    return data['result']
                else:
                    self.logger.error(f"Failed to get bot info: {response.status}")
                    return None
        except Exception as e:
            self.logger.error(f"Error getting bot info: {e}")
            return None
    
    async def send_message(self, message: str, parse_mode: str = "HTML", 
                          disable_web_page_preview: bool = True,
                          priority: str = "normal"):
        """發送訊息到隊列"""
        
        message_data = {
            'text': message,
            'parse_mode': parse_mode,
            'disable_web_page_preview': disable_web_page_preview,
            'priority': priority,
            'timestamp': datetime.now()
        }
        
        await self.message_queue.put(message_data)
    
    async def _message_processor(self):
        """訊息處理器"""
        
        while True:
            try:
                # 等待訊息
                message_data = await self.message_queue.get()
                
                # 檢查速率限制
                await self.rate_limiter.wait()
                
                # 發送訊息
                await self._send_telegram_message(message_data)
                
                # 標記任務完成
                self.message_queue.task_done()
                
            except Exception as e:
                self.logger.error(f"Error in message processor: {e}")
                await asyncio.sleep(1)
    
    async def _send_telegram_message(self, message_data: Dict):
        """實際發送 Telegram 訊息"""
        
        url = f"{self.base_url}/sendMessage"
        payload = {
            'chat_id': self.config.chat_id,
            'text': message_data['text'],
            'parse_mode': message_data['parse_mode'],
            'disable_web_page_preview': message_data['disable_web_page_preview']
        }
        
        for attempt in range(self.config.retry_attempts):
            try:
                async with self.session.post(url, json=payload) as response:
                    if response.status == 200:
                        self.logger.debug("Message sent successfully")
                        return
                    else:
                        error_text = await response.text()
                        self.logger.warning(f"Failed to send message: {response.status} - {error_text}")
                        
            except Exception as e:
                self.logger.error(f"Error sending message (attempt {attempt + 1}): {e}")
                
                if attempt < self.config.retry_attempts - 1:
                    await asyncio.sleep(self.config.retry_delay)
    
    async def stop(self):
        """停止 Bot"""
        if self.session:
            await self.session.close()

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_messages_per_minute: int):
        self.max_messages = max_messages_per_minute
        self.message_times = []
    
    async def wait(self):
        """等待速率限制"""
        now = datetime.now()
        
        # 清理超過一分鐘的記錄
        self.message_times = [
            msg_time for msg_time in self.message_times 
            if now - msg_time < timedelta(minutes=1)
        ]
        
        # 如果超過限制,等待
        if len(self.message_times) >= self.max_messages:
            wait_time = 60 - (now - self.message_times[0]).total_seconds()
            if wait_time > 0:
                await asyncio.sleep(wait_time)
        
        # 記錄這次發送時間
        self.message_times.append(now)

2. 訊息模板系統

# src/notifications/message_templates.py
from datetime import datetime
from typing import Dict, Any
import emoji

class MessageTemplates:
    """訊息模板管理"""
    
    @staticmethod
    def trading_signal(signal_data: Dict) -> str:
        """交易信號通知"""
        
        symbol = signal_data.get('symbol', 'Unknown')
        action = signal_data.get('action', 'Unknown')
        price = signal_data.get('price', 0)
        size = signal_data.get('size', 0)
        strategy = signal_data.get('strategy', 'Unknown')
        
        action_emoji = {
            'buy': '🟢',
            'sell': '🔴',
            'close': '⚫'
        }.get(action.lower(), '🔵')
        
        return f"""
{action_emoji} <b>交易信號</b>

📊 <b>標的:</b>{symbol}
🎯 <b>動作:</b>{action.upper()}
💰 <b>價格:</b>${price:,.2f}
📏 <b>數量:</b>{size}
🤖 <b>策略:</b>{strategy}
🕒 <b>時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
    
    @staticmethod
    def order_executed(order_data: Dict) -> str:
        """訂單執行通知"""
        
        symbol = order_data.get('symbol', 'Unknown')
        side = order_data.get('side', 'Unknown')
        size = order_data.get('size', 0)
        price = order_data.get('price', 0)
        order_id = order_data.get('order_id', 'Unknown')
        
        side_emoji = '🟢' if side.lower() == 'buy' else '🔴'
        
        return f"""
{side_emoji} <b>訂單執行成功</b>

📋 <b>訂單ID:</b>{order_id}
📊 <b>標的:</b>{symbol}
📈 <b>方向:</b>{side.upper()}
💰 <b>價格:</b>${price:,.2f}
📏 <b>數量:</b>{size}
💵 <b>總值:</b>${price * size:,.2f}
🕒 <b>時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
    
    @staticmethod
    def pnl_update(pnl_data: Dict) -> str:
        """損益更新通知"""
        
        total_pnl = pnl_data.get('total_pnl', 0)
        daily_pnl = pnl_data.get('daily_pnl', 0)
        unrealized_pnl = pnl_data.get('unrealized_pnl', 0)
        portfolio_value = pnl_data.get('portfolio_value', 0)
        
        pnl_emoji = '📈' if total_pnl >= 0 else '📉'
        daily_emoji = '🟢' if daily_pnl >= 0 else '🔴'
        
        return f"""
{pnl_emoji} <b>損益報告</b>

💰 <b>投資組合總值:</b>${portfolio_value:,.2f}
📊 <b>總損益:</b>${total_pnl:,.2f}
{daily_emoji} <b>今日損益:</b>${daily_pnl:,.2f}
⏳ <b>未實現損益:</b>${unrealized_pnl:,.2f}
📅 <b>更新時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
    
    @staticmethod
    def risk_alert(alert_data: Dict) -> str:
        """風險警告通知"""
        
        alert_type = alert_data.get('type', 'Unknown')
        message = alert_data.get('message', 'Unknown risk detected')
        severity = alert_data.get('severity', 'medium')
        
        severity_emoji = {
            'low': '🟡',
            'medium': '🟠', 
            'high': '🔴',
            'critical': '🚨'
        }.get(severity, '⚠️')
        
        return f"""
{severity_emoji} <b>風險警告</b>

⚠️ <b>類型:</b>{alert_type}
📝 <b>描述:</b>{message}
🎚️ <b>嚴重程度:</b>{severity.upper()}
🕒 <b>時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

請立即檢查系統狀況!
"""
    
    @staticmethod
    def system_status(status_data: Dict) -> str:
        """系統狀態通知"""
        
        status = status_data.get('status', 'unknown')
        components = status_data.get('components', {})
        uptime = status_data.get('uptime', 'Unknown')
        
        status_emoji = {
            'healthy': '🟢',
            'warning': '🟡',
            'error': '🔴',
            'offline': '⚫'
        }.get(status, '❓')
        
        component_status = '\n'.join([
            f"  • {name}: {'✅' if health else '❌'}"
            for name, health in components.items()
        ])
        
        return f"""
{status_emoji} <b>系統狀態報告</b>

🖥️ <b>總體狀態:</b>{status.upper()}
⏱️ <b>運行時間:</b>{uptime}

<b>組件狀態:</b>
{component_status}

🕒 <b>檢查時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""

    @staticmethod
    def daily_summary(summary_data: Dict) -> str:
        """每日總結報告"""
        
        trades_count = summary_data.get('trades_count', 0)
        total_volume = summary_data.get('total_volume', 0)
        pnl = summary_data.get('pnl', 0)
        win_rate = summary_data.get('win_rate', 0)
        top_performer = summary_data.get('top_performer', 'N/A')
        
        pnl_emoji = '📈' if pnl >= 0 else '📉'
        
        return f"""
📊 <b>每日交易總結</b>

🔢 <b>交易次數:</b>{trades_count}
💵 <b>交易量:</b>${total_volume:,.2f}
{pnl_emoji} <b>淨損益:</b>${pnl:,.2f}
🎯 <b>勝率:</b>{win_rate:.1%}
🏆 <b>最佳標的:</b>{top_performer}

📅 <b>日期:</b>{datetime.now().strftime('%Y-%m-%d')}
"""

3. 通知管理器

# src/notifications/notification_manager.py
import asyncio
import logging
from typing import Dict, List, Optional
from enum import Enum

class NotificationLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class NotificationManager:
    """統一通知管理器"""
    
    def __init__(self, telegram_bot: TelegramBot):
        self.telegram_bot = telegram_bot
        self.templates = MessageTemplates()
        self.logger = logging.getLogger(__name__)
        
        # 通知設定
        self.notification_settings = {
            NotificationLevel.INFO: True,
            NotificationLevel.WARNING: True,
            NotificationLevel.ERROR: True,
            NotificationLevel.CRITICAL: True
        }
        
        # 重複通知防護
        self.recent_notifications = {}
        self.duplicate_threshold = 300  # 5分鐘內相同通知視為重複
    
    async def send_trading_signal(self, signal_data: Dict):
        """發送交易信號通知"""
        message = self.templates.trading_signal(signal_data)
        await self._send_notification(message, NotificationLevel.INFO)
    
    async def send_order_executed(self, order_data: Dict):
        """發送訂單執行通知"""
        message = self.templates.order_executed(order_data)
        await self._send_notification(message, NotificationLevel.INFO)
    
    async def send_pnl_update(self, pnl_data: Dict):
        """發送損益更新通知"""
        message = self.templates.pnl_update(pnl_data)
        await self._send_notification(message, NotificationLevel.INFO)
    
    async def send_risk_alert(self, alert_data: Dict):
        """發送風險警告"""
        severity = alert_data.get('severity', 'medium')
        level = {
            'low': NotificationLevel.INFO,
            'medium': NotificationLevel.WARNING,
            'high': NotificationLevel.ERROR,
            'critical': NotificationLevel.CRITICAL
        }.get(severity, NotificationLevel.WARNING)
        
        message = self.templates.risk_alert(alert_data)
        await self._send_notification(message, level, allow_duplicate=False)
    
    async def send_system_status(self, status_data: Dict):
        """發送系統狀態通知"""
        message = self.templates.system_status(status_data)
        await self._send_notification(message, NotificationLevel.INFO)
    
    async def send_daily_summary(self, summary_data: Dict):
        """發送每日總結"""
        message = self.templates.daily_summary(summary_data)
        await self._send_notification(message, NotificationLevel.INFO)
    
    async def send_custom_message(self, message: str, level: NotificationLevel = NotificationLevel.INFO):
        """發送自訂訊息"""
        await self._send_notification(message, level)
    
    async def _send_notification(self, message: str, level: NotificationLevel, 
                                allow_duplicate: bool = True):
        """內部通知發送方法"""
        
        # 檢查通知等級是否啟用
        if not self.notification_settings.get(level, True):
            return
        
        # 檢查重複通知
        if not allow_duplicate and self._is_duplicate_notification(message):
            self.logger.debug("Duplicate notification blocked")
            return
        
        # 設定優先級
        priority = {
            NotificationLevel.INFO: "normal",
            NotificationLevel.WARNING: "high",
            NotificationLevel.ERROR: "high",
            NotificationLevel.CRITICAL: "urgent"
        }.get(level, "normal")
        
        try:
            await self.telegram_bot.send_message(message, priority=priority)
            self.logger.info(f"Notification sent: {level.value}")
            
            # 記錄通知歷史
            self._record_notification(message)
            
        except Exception as e:
            self.logger.error(f"Failed to send notification: {e}")
    
    def _is_duplicate_notification(self, message: str) -> bool:
        """檢查是否為重複通知"""
        message_hash = hash(message)
        now = datetime.now()
        
        if message_hash in self.recent_notifications:
            last_sent = self.recent_notifications[message_hash]
            if (now - last_sent).total_seconds() < self.duplicate_threshold:
                return True
        
        return False
    
    def _record_notification(self, message: str):
        """記錄通知歷史"""
        message_hash = hash(message)
        self.recent_notifications[message_hash] = datetime.now()
        
        # 清理舊記錄
        cutoff_time = datetime.now() - timedelta(seconds=self.duplicate_threshold * 2)
        self.recent_notifications = {
            msg_hash: timestamp
            for msg_hash, timestamp in self.recent_notifications.items()
            if timestamp > cutoff_time
        }
    
    def set_notification_level(self, level: NotificationLevel, enabled: bool):
        """設定通知等級開關"""
        self.notification_settings[level] = enabled
        self.logger.info(f"Notification level {level.value} {'enabled' if enabled else 'disabled'}")

整合到交易系統

在交易引擎中集成通知

# src/trading_engine/core.py (更新版本)
class TradingEngine:
    def __init__(self, config: Config):
        # ... 其他初始化程式碼 ...
        
        # 初始化通知系統
        telegram_config = TelegramConfig(
            bot_token=config.telegram.bot_token,
            chat_id=config.telegram.chat_id
        )
        self.telegram_bot = TelegramBot(telegram_config)
        self.notification_manager = NotificationManager(self.telegram_bot)
    
    async def start(self):
        """啟動交易引擎"""
        try:
            # 啟動通知系統
            await self.telegram_bot.start()
            
            # 發送系統啟動通知
            await self.notification_manager.send_custom_message(
                "🚀 <b>交易系統啟動</b>\n\n系統已成功啟動,開始監控市場...",
                NotificationLevel.INFO
            )
            
            # ... 其他啟動程式碼 ...
            
        except Exception as e:
            await self.notification_manager.send_custom_message(
                f"❌ <b>系統啟動失敗</b>\n\n錯誤: {str(e)}",
                NotificationLevel.CRITICAL
            )
            raise
    
    async def execute_trade(self, signal):
        """執行交易並發送通知"""
        try:
            # 執行交易
            order_result = await self.order_manager.execute_order(signal)
            
            if order_result.success:
                # 發送成功通知
                await self.notification_manager.send_order_executed({
                    'symbol': signal.symbol,
                    'side': signal.side,
                    'size': signal.size,
                    'price': order_result.price,
                    'order_id': order_result.order_id
                })
            else:
                # 發送失敗通知
                await self.notification_manager.send_custom_message(
                    f"⚠️ <b>交易執行失敗</b>\n\n標的: {signal.symbol}\n錯誤: {order_result.error}",
                    NotificationLevel.ERROR
                )
                
        except Exception as e:
            await self.notification_manager.send_custom_message(
                f"❌ <b>交易執行異常</b>\n\n{str(e)}",
                NotificationLevel.CRITICAL
            )

定期報告任務

# src/tasks/reporting.py
import asyncio
from datetime import datetime, time

class ReportingTasks:
    """定期報告任務"""
    
    def __init__(self, notification_manager: NotificationManager, 
                 portfolio_manager, trading_engine):
        self.notification_manager = notification_manager
        self.portfolio_manager = portfolio_manager
        self.trading_engine = trading_engine
        
    async def start_scheduled_reports(self):
        """啟動定時報告"""
        
        # 每日總結報告 (每天晚上23:00)
        asyncio.create_task(self._daily_summary_task())
        
        # 每小時損益報告
        asyncio.create_task(self._hourly_pnl_task())
        
        # 系統健康檢查 (每30分鐘)
        asyncio.create_task(self._health_check_task())
    
    async def _daily_summary_task(self):
        """每日總結任務"""
        while True:
            now = datetime.now()
            
            # 計算到晚上23:00的秒數
            target_time = now.replace(hour=23, minute=0, second=0, microsecond=0)
            if target_time <= now:
                target_time = target_time.replace(day=target_time.day + 1)
            
            sleep_seconds = (target_time - now).total_seconds()
            await asyncio.sleep(sleep_seconds)
            
            # 生成每日總結
            summary_data = await self.portfolio_manager.get_daily_summary()
            await self.notification_manager.send_daily_summary(summary_data)
    
    async def _hourly_pnl_task(self):
        """每小時損益報告"""
        while True:
            await asyncio.sleep(3600)  # 1小時
            
            pnl_data = await self.portfolio_manager.get_pnl_summary()
            await self.notification_manager.send_pnl_update(pnl_data)
    
    async def _health_check_task(self):
        """系統健康檢查"""
        while True:
            await asyncio.sleep(1800)  # 30分鐘
            
            health_status = await self.trading_engine.get_system_health()
            await self.notification_manager.send_system_status(health_status)

小結

今天我們完成了整個量化交易系統的最後一塊拼圖 - Telegram 通知系統,就像為農場安裝了現代化的通訊設備。

30天學習回顧:

  1. AWS 基礎設施 (Day 1-10):建立了穩固的雲端基礎
  2. CI/CD 自動化 (Day 11-15):實現了現代化的開發流程
  3. 量化交易理論 (Day 16-25):掌握了金融科技核心概念
  4. 系統實作 (Day 26-31):建立了完整的交易系統

Telegram 通知系統特色:

  • 即時交易通知
  • 智能重複檢測
  • 多等級警告系統
  • 美觀的訊息格式
  • 速率限制保護

完整系統架構優勢:

  • 雲原生設計,高可用性
  • 模組化架構,易於維護
  • 自動化部署,高效開發
  • 全面監控,及時響應

從一個鄉下小孩的角度,我們成功建立了一個現代化的量化交易系統,就像把傳統農業轉變為智慧農業一樣。這個系統不僅能自動化執行交易策略,還能即時監控並通知重要事件。

學習成果:

  • 掌握了 AWS 雲端服務
  • 學會了現代化 CI/CD 流程
  • 理解了量化交易核心概念
  • 具備了系統性思維和實作能力

這30天的學習旅程結束了,但真正的量化交易之路才剛開始。記住爸爸說過的話:「種田要有耐心,技術要持續學習」。量化交易也是如此,需要不斷學習、優化和適應市場變化。

願這個系列能幫助大家踏上成功的量化交易之路!🚀


系列完結 - 感謝大家30天的陪伴!


上一篇
Day 30: Github Runner - Build image
系列文
小資族的量化交易 10131
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言